﻿#include "service_layer.hh"
#ifndef DISABLE_SB_CODE
#include "box/service_box.hh"
#include "lang_impl.hh"
#endif
#include "box/box_network.hh"
#include "config/box_config.hh"
#include "util/os_util.hh"
#include "util/string_util.hh"
#include "zookeeper/service_finder_zookeeper.hh"
#include <algorithm>
#include <climits>
#include <functional>

kratos::service::ServiceLayer::ServiceLayer(ServiceBox *box) {
#ifndef DISABLE_SB_CODE
  box_ = box;
  if (box_) {
    service_finder_ = box->get_service_finder();
    service_register_ = box->get_service_register();
    network_ = dynamic_cast<BoxNetwork *>(box_);
    if (box->get_config().has_attribute("service_layer.balance_type")) {
      balance_type_ =
          box->get_config().get_string("service_layer.balance_type");
    }
  }
#else
  (void)box;
  (void)box_;
#endif
}

kratos::service::ServiceLayer::~ServiceLayer() {}

auto kratos::service::ServiceLayer::set_service_mach(
    BoxNetwork *network, ServiceFinder *service_finder,
    ServiceRegister *service_register) -> void {
  network_ = network;
  service_finder_ = service_finder;
  service_register_ = service_register;
}

auto kratos::service::ServiceLayer::on_connect(const std::string &name,
                                               std::uint64_t channel_id)
    -> void {
  //
  // 这里name的形式为:{service name}-{host}
  //
  std::vector<std::string> result;
  util::split(name, "-", result);
  if (result.size() != 2) {
#ifndef DISABLE_SB_CODE
    box_->write_log(lang::LangID::LANG_UNEXPECTED_ERROR,
                    klogger::Logger::FAILURE, "service_layer",
                    ("invalid connector name [" + name + "]").c_str());
#endif
    return;
  }
  const auto &host = result[1]; // host
  //
  // 更新信息
  //
  for (const auto &service_name : connecting_host_map_[host]) {
    auto &host_info = service_map_[service_name];
    auto &host_vec = host_info.host_vec;
    if (host_info.host_vec.empty()) {
      host_info.index = 0;
    }
    host_vec.emplace_back(Host{host, channel_id});
#ifndef DISABLE_SB_CODE
    box_->write_log(lang::LangID::LANG_HOST_CONNECT_INFO,
                    klogger::Logger::INFORMATION, service_name.c_str(),
                    host.c_str(), (int)host_info.host_vec.size());
#endif
    //
    // 如果有多个服务实例通过不同的管道连接，那么每个管道建立成功时，都会调用一次回调
    //
    auto cb_it = cb_map_.find(service_name);
    if (cb_it != cb_map_.end()) {
      //
      // 调用回调函数
      //
      for (auto &info : cb_it->second) {
        if (!info.trigger) {
          info.cb(service_name, channel_id, info.cb_id,
                  ServiceLayerEvent::CONNECT);
          info.trigger = true;
        }
      }
    }
  }
  // 清理信息
  connecting_host_map_.erase(host);
}

auto kratos::service::ServiceLayer::on_close(const std::string &name,
                                             std::uint64_t channel_id) -> void {
  for (auto &[service_name, host_info] : service_map_) {
    for (auto it = host_info.host_vec.begin();
         it != host_info.host_vec.end();) {
      if (it->channel_id == channel_id) {
        it = host_info.host_vec.erase(it);
        //
        // 如果有多个服务实例通过不同的管道连接，那么每个管道关闭时，都会调用一次回调
        //
        auto cb_it = cb_map_.find(service_name);
        if (cb_it != cb_map_.end()) {
          for (auto &info : cb_it->second) {
            //
            // 调用回调函数
            //
            info.cb(service_name, channel_id, info.cb_id,
                    ServiceLayerEvent::CLOSE);
            // 重置触发标志
            info.trigger = false;
          }
#ifndef DISABLE_SB_CODE
          box_->write_log(lang::LangID::LANG_HOST_DISCONNECT_INFO,
                          klogger::Logger::INFORMATION, service_name.c_str(),
                          name.c_str(), (int)host_info.host_vec.size());
#endif
        }
      } else {
        it++;
      }
    }
  }
  //
  // 这里name的形式为:{service name}-{host}
  //
  std::vector<std::string> result;
  util::split(name, "-", result);
  if (result.size() != 2) {
#ifndef DISABLE_SB_CODE
    box_->write_log(lang::LangID::LANG_UNEXPECTED_ERROR,
                    klogger::Logger::FAILURE, "service_layer",
                    ("invalid connector name [" + name + "]").c_str());
#endif
    return;
  }
  // 全部连接断开则清理
  auto it = service_map_.find(result[0]);
  if (it != service_map_.end()) {
    if (it->second.host_vec.empty()) {
      service_map_.erase(it);
    }
  }
}

auto kratos::service::ServiceLayer::get_channel(const std::string &service_name)
    -> std::uint64_t {
  auto real_name = get_real_name(service_name);
  auto root_it = service_map_.find(real_name);
  if (root_it == service_map_.end() || root_it->second.host_vec.empty()) {
    // 添加变化监听器
    service_finder_->add_listener(
        real_name,
        [&](const std::string &name, const std::vector<std::string> &hosts)
            -> void { service_listener(name, hosts); });
    // 发现并连接到指定host
    std::list<std::string> new_hosts;
    if (!service_finder_->find_service(real_name, new_hosts)) {
      return 0;
    }
    for (const auto &host : new_hosts) {
      connect_to_host(real_name, host);
    }
    return 0;
  } else {
    // 缓存内获取并轮询返回
    return get_channel_balance(root_it->second);
  }
}

auto kratos::service::ServiceLayer::check_channel(
    const std::string &service_name, std::uint64_t channel_id) -> bool {
  auto real_name = get_real_name(service_name);
  auto root_it = service_map_.find(real_name);
  if (root_it == service_map_.end() || root_it->second.host_vec.empty()) {
    return false;
  }
  for (const auto &host : root_it->second.host_vec) {
    if (host.channel_id == channel_id) {
      return true;
    }
  }
  return false;
}

auto kratos::service::ServiceLayer::get_channel(const std::string &service_name,
                                                ServiceEventCallback cb,
                                                std::uint32_t cb_id)
    -> std::uint64_t {
  auto real_name = get_real_name(service_name);
  cb_map_[real_name].push_back(CbInfo{cb, false, cb_id});
  return get_channel(real_name);
}

auto kratos::service::ServiceLayer::try_get_channel(
    const std::string &service_name) -> std::uint64_t {
  auto real_name = get_real_name(service_name);
  auto root_it = service_map_.find(real_name);
  if (root_it == service_map_.end() || root_it->second.host_vec.empty()) {
    return 0;
  }
  // 缓存内获取并轮询返回
  return get_channel_balance(root_it->second);
}

auto kratos::service::ServiceLayer::get_remote_service() -> const ServiceMap & {
  return service_map_;
}

auto kratos::service::ServiceLayer::report_bad_channel(std::uint64_t channel_id)
    -> void {
  for (auto &[service_name, host_info] : service_map_) {
    for (const auto &host : host_info.host_vec) {
      if (host.channel_id == channel_id) {
        // 关闭失效管道, 失效的标准由逻辑层判断
        network_->close_channel(channel_id);
      }
    }
  }
}

auto kratos::service::ServiceLayer::set_balance_type(
    const std::string &balance_type) -> void {
  balance_type_ = balance_type;
}

auto kratos::service::ServiceLayer::service_listener(
    const std::string &name, const std::vector<std::string> &hosts) -> void {
  std::vector<std::string> new_hosts;
  auto root_it = service_map_.find(name);
  if (root_it == service_map_.end()) {
    new_hosts = hosts;
  } else {
    const auto &host_vec = root_it->second.host_vec;
    for (const auto &host : hosts) {
      for (const auto &connected_host : host_vec) {
        if (connected_host.host == host) {
          continue;
        } else {
          new_hosts.emplace_back(host);
        }
      }
    }
  }
  // TODO 限制连接到同类型的服务的连接数量
  // 连接到新的host
  for (const auto &host : new_hosts) {
    connect_to_host(name, host);
  }
}

auto kratos::service::ServiceLayer::connect_to_host(const std::string &name,
                                                    const std::string &host)
    -> bool {
  //
  // 对同一个主机的连接如果已经存在则不再发起新的连接请求, 记录连接请求
  //
  if (connecting_host_map_.find(host) != connecting_host_map_.end()) {
    connecting_host_map_[host].push_back(name);
    return true;
  }
  //
  // 检测是否已经存在连接
  //
  auto it = service_map_.find(name);
  if ((it != service_map_.end()) && !it->second.host_vec.empty()) {
    return true;
  }
  std::string ip;
  int port;
  // 获取配置
  if (!util::get_host_config(host, ip, port)) {
    // 配置格式错误
#ifndef DISABLE_SB_CODE
    box_->write_log(lang::LangID::LANG_BOX_SERVICE_ADDRESS_INCORRECT,
                    klogger::Logger::FAILURE, host.c_str(), name.c_str());
#endif
    return false;
  }
  auto timeout = network_->get_config().get_connect_other_box_timeout();
  // 连接到目标服务容器
  // NOTICE 这里的名字使用name-host
  if (!network_->connect_to(name + "-" + host, util::get_network_type(host), ip,
                            port, timeout)) {
#ifndef DISABLE_SB_CODE
    // 地址格式错误
    box_->write_log(lang::LangID::LANG_BOX_SERVICE_ADDRESS_INCORRECT,
                    klogger::Logger::FAILURE, host.c_str(), name.c_str());
#endif
    return false;
  }
  // 记录第一个发起连接的主机及服务
  connecting_host_map_[host].push_back(name);
  return true;
}

auto kratos::service::ServiceLayer::get_channel_balance(HostInfo &host_info)
    -> std::uint64_t {
  if (balance_type_ == "random") {
    return get_channel_balance_random(host_info);
  } else if (balance_type_ == "roundrobin") {
    return get_channel_balance_round_robin(host_info);
  } else if (balance_type_ == "lru") {
    return get_channel_balance_lru(host_info);
  } else {
    return get_channel_balance_random(host_info);
  }
}

auto kratos::service::ServiceLayer::get_channel_balance_round_robin(
    HostInfo &host_info) -> std::uint64_t {
  if (host_info.host_vec.empty()) {
    return 0;
  }
  auto roller_index = host_info.index;
  if (roller_index >= host_info.host_vec.size()) {
    roller_index = 0;
  }
  auto ret = host_info.host_vec[roller_index];
  host_info.index += 1;
  ret.ref_count += 1;
  return ret.channel_id;
}

auto kratos::service::ServiceLayer::get_channel_balance_random(
    HostInfo &host_info) -> std::uint64_t {
  if (host_info.host_vec.empty()) {
    return 0;
  }
  auto index =
      util::get_random_uint32(0, std::uint32_t(host_info.host_vec.size() - 1));
  auto &ret = host_info.host_vec[index];
  ret.ref_count += 1;
  return ret.channel_id;
}

auto kratos::service::ServiceLayer::get_channel_balance_lru(HostInfo &host_info)
    -> std::uint64_t {
  if (host_info.host_vec.empty()) {
    return 0;
  }
  Host *host_ptr{nullptr};
  auto ref_count = ULLONG_MAX;
  for (auto &host : host_info.host_vec) {
    if (host.ref_count < ref_count) {
      host_ptr = &host;
      ref_count = host.ref_count;
    }
  }
  if (!host_ptr) {
    return 0;
  }
  host_ptr->ref_count += 1;
  return host_ptr->channel_id;
}
